Source code for hysop.domain.domain

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Abstract interfaces for physical domains description.
* :class:`~hysop.domain.domain.Domain`
* :class:`~hysop.domain.domain.DomainView`
"""
import hashlib
import numpy as np
from abc import ABCMeta, abstractmethod

from hysop.constants import HYSOP_DEFAULT_TASK_ID, HYSOP_DIM, HYSOP_INTEGER
from hysop.core.mpi import main_comm, MPI
from hysop.tools.parameters import MPIParams
from hysop.tools.decorators import debug
from hysop.tools.handle import RegisteredObject, TaggedObjectView
from hysop.tools.htypes import check_instance
from hysop.tools.numpywrappers import npw
from hysop.symbolic.frame import SymbolicFrame


[docs] class DomainView(TaggedObjectView, metaclass=ABCMeta): """Abstract base class for views on domains.""" __slots__ = ("_domain", "_topology_state") @debug def __init__(self, topology_state, domain=None, **kwds): super().__init__(obj_view=domain, **kwds)
[docs] @debug def __new__(cls, topology_state, domain=None, **kwds): """Create and initialize a DomainView.""" from hysop.topology.topology import TopologyState check_instance(topology_state, TopologyState) check_instance(domain, Domain, allow_none=True) obj = super().__new__(cls, obj_view=domain, **kwds) domain = domain or obj check_instance(domain, Domain) obj._domain = domain obj._topology_state = topology_state return obj
def _get_domain(self): """Return the domain on which the view is on.""" return self._domain def _get_topology_state(self): """Return the topology state altering this domain view.""" return self._topology_state def _get_dim(self): """Return the dimension of the domain.""" return self._domain._dim def _get_parent_comm(self): """Return the parent communicator used to create this domain.""" return self._domain._parent_comm def _get_parent_rank(self): """Return the rank of the process in the parent communicator.""" return self._domain._parent_rank
[docs] def task_intercomm(self, task_id): """ Return the intercommunicator that owns the current process with the other task given. """ return self._domain._task_intercomm[task_id]
def _get_has_tasks(self): """Return if the domains contains 2 tasks or more.""" return self._domain._has_tasks def _get_all_tasks(self): """Return all task id.""" return self._domain._all_tasks
[docs] def task_root_in_parent(self, task_id): """Return the rank of the root process in the parent communicator""" return self._domain._task_root_in_parent[task_id]
def _get_machine_comm(self): """ Return the communicator that owns the current process. This is the sub-communicator which has been obtained by splitting. the parent communicator by machine name. """ return self._domain._machine_comm def _get_machine_rank(self): """Return the rank of the process in the machine communicator.""" return self._domain._machine_rank def _get_proc_tasks(self): """Return mapping between mpi process rank and task identifier.""" return self._domain._proc_tasks def _get_registered_topologies(self): """ Return the dictionary of all topologies already built on this domain, with topology ids as keys and :class:`~hysop.topology.topology.Topology` as values. """ return self._domain._registered_topologies def _get_frame(self): """Get symbolic frame associated to this domain.""" return self._domain._frame
[docs] def task_on_proc(self, parent_rank): """Get task identifier for a given mpi process (parent communicator rank).""" if parent_rank >= len(self._domain._proc_tasks): msg = f"Unknown rank {parent_rank} in parent communicator." raise ValueError(msg) return self._domain._proc_tasks[parent_rank]
[docs] def current_task(self): """Get task number of the current mpi process. Return always the first task in case of multi-tasks""" t = self.task_on_proc(self._domain._parent_rank) try: return t[0] except IndexError: return t
[docs] def current_task_list(self): """Get task number of the current mpi process. Return always a tuple ot taks id""" t = self.task_on_proc(self._domain._parent_rank) if isinstance(t, list) or isinstance(t, tuple) or isinstance(t, np.ndarray): return t else: return [ t, ]
[docs] def get_task_comm(self, task_id=None): """ Return the communicator that owns the current process. This is the sub-communicator which has been obtained by splitting. the parent communicator by colors (proc_tasks). """ if task_id is None: task_id = self.current_task() if task_id in self._domain._task_comm: return self._domain._task_comm[task_id] return None
def _get_task_comm(self): return self.get_task_comm()
[docs] def task_rank(self, task_id=None): """Return the rank of the process in the task communicator.""" if task_id is None: task_id = self.current_task() if task_id in self._domain._task_rank: return self._domain._task_rank[task_id] return None
def _is_task_matters(self, tid, proctasks): return self._domain._is_task_matters(tid, proctasks)
[docs] def is_on_task(self, params): """Test if the current process corresponds to param task.""" if isinstance(params, MPIParams): task_id = params.task_id elif isinstance(params, (int, npw.integer)): task_id = params else: msg = "Could not extract task_id from type {}." msg = msg.format(type(params)) raise TypeError(msg) return self._is_task_matters( task_id, self.task_on_proc(self._domain._parent_rank) )
[docs] def print_topologies(self): """Print all topologies registered on the domain.""" print(self.short_description() + " defined the following topologies:") for topo in self._domain._registered_topologies.values(): print(" *" + topo.short_description())
[docs] @abstractmethod def short_description(self): """Return a short description of this domain as a string.""" pass
[docs] @abstractmethod def long_description(self): """Return a long description of this domain as a string.""" pass
def __eq__(self, other): if not isinstance(other, DomainView): return NotImplemented eq = self._domain is other._domain eq &= self._topology_state == other._topology_state return eq def __ne__(self, other): if not isinstance(other, DomainView): return NotImplemented eq = self._domain is other._domain eq &= self._topology_state == other._topology_state return not eq def __hash__(self): return id(self._domain) ^ hash(self._topology_state)
[docs] def __str__(self): """Equivalent to self.long_description()""" return self.long_description()
[docs] def tasks_overlapping(self, ta, tb): return self._domain._overlapping_map[ta][tb]
domain = property(_get_domain) dim = property(_get_dim) proc_tasks = property(_get_proc_tasks) parent_comm = property(_get_parent_comm) task_comm = property(_get_task_comm) parent_rank = property(_get_parent_rank) has_tasks = property(_get_has_tasks) all_tasks = property(_get_all_tasks) machine_comm = property(_get_machine_comm) machine_rank = property(_get_machine_rank) registered_topologies = property(_get_registered_topologies) frame = property(_get_frame)
[docs] class Domain(RegisteredObject, metaclass=ABCMeta): """Abstract base class for the description of physical domains.""" @debug def __init__(self, dim, parent_comm=None, proc_tasks=None, **kwds): super().__init__( dim=dim, parent_comm=parent_comm, proc_tasks=proc_tasks, tag_prefix="d", **kwds, )
[docs] @debug def __new__(cls, dim, parent_comm=None, proc_tasks=None, **kwds): """ Create or get an existing physical domain of given dim on a specified MPI communicator and specific tasks. Parameters ---------- dim : integer, optional dim of the domain. parent_comm : MPI.Intracomm, optional Parent communicator which may be split. If not given this will be hysop.core.mpi.main_comm. proc_tasks : tuple of ints or tuples of int, optional Mapping between mpi process rank and task identifier. If not given all procs will be on task HYSOP_DEFAULT_TASK_ID. Attributes ---------- dim : int Dimension of the domain. proc_tasks : tuple of ints or tuples of int Mapping between mpi process rank and task identifier. parent_comm: MPI.Intracomm Return the parent communicator used to create this domain. parent_rank: int Return the rank of the process in the parent communicator. task_comm : MPI.IntraComm Return the communicator that owns the current process. This is the sub-communicator which has been obtained by splitting the parent communicator by colors (proc_tasks). task_rank: int Return the rank of the process in the task communicator. registered_topologies : dict Dictionary of all topologies already built on this domain with topology ids as keys and :class:`~hysop.topology.topology.Topology` as values. Notes ----- *Parent communicator is split/subgrouped according to proc_tasks. *About MPI Tasks proc_tasks[n] = 12 means that task 12 owns proc n or equivalently that proc n is dedicated to task 12. proc_tasks[n] = (12, 13) means that proc n is dedicated to both tasks 12 and 13. *Examples of supported mapping: - None or [1,1,1,1] : Single task (nothing more to do) - [1,2,2,2] : disjoint tasks (two task_comm created by Comm_Split and one intercommunicator for each other task) - [(1,2), (1,2), (2,), (2,)] : nested tasks (use the largest task intracommunicator as inter-task communication) - [(1,), (1,2), (2,), (2,)] : non zero intersection (Not handled yet) - [(1,2), (1,), (2,), (2,)] : non zero intersection with same leader (Not handled yet) *A dupped parent_comm will return another idenpendent domain instance, because MPI communicators are hashed trough their python object id. """ dim = int(dim) parent_comm = parent_comm or main_comm check_instance(proc_tasks, tuple, values=(int, tuple, list), allow_none=True) proc_tasks = proc_tasks or [(HYSOP_DEFAULT_TASK_ID,)] * parent_comm.Get_size() assert len(proc_tasks) == parent_comm.Get_size(), f"{proc_tasks}" assert all([type(_) is type(proc_tasks[0]) for _ in proc_tasks]) # Sort tasks and flatten if single task per proc. try: # Tasks are sorted on each proc according to task size def s_proc_tasks(pt): return tuple( sorted( pt, key=lambda t: sum(t in _ for _ in proc_tasks), reverse=True ) ) proc_tasks = npw.asarray( [ npw.asarray(s_proc_tasks(pt), dtype=HYSOP_INTEGER) for pt in proc_tasks ], dtype=object, ) if all([len(_) == 1 for _ in proc_tasks]): proc_tasks = npw.asarray( [_[0] for _ in proc_tasks], dtype=HYSOP_INTEGER ) except TypeError: assert type(proc_tasks[0]) is int proc_tasks = npw.asarray(proc_tasks, dtype=HYSOP_INTEGER) npw.set_readonly(proc_tasks) # double check types, to be sure RegisteredObject will work as expected check_instance(dim, int) check_instance(parent_comm, MPI.Intracomm) obj = super().__new__( cls, dim=dim, parent_comm=parent_comm, proc_tasks=proc_tasks, tag_prefix="d", **kwds, ) if not obj.obj_initialized: obj.__initialize(dim, parent_comm, proc_tasks) return obj
@debug def __initialize(self, dim, parent_comm, proc_tasks): parent_rank = parent_comm.Get_rank() parent_size = parent_comm.Get_size() # is_task_matters : return if the given task matters on given proctask item try: # if proc taks contains iterable instead of ints all_tasks = {t for _ in proc_tasks for t in _} # Check for nested tasks: 1 taks is containing all procs msg = "Non nested tasks are not handled yet (given proc tasks : {})".format( proc_tasks ) assert any([all([t in _ for _ in proc_tasks]) for t in all_tasks]), msg # Check for all tasks sharing the same root process all_tasks_roots = [ next(i for i, _ in enumerate(proc_tasks) if t in _) for t in all_tasks ] msg = "Nested tasks are not sharing the same root (given proc tasks : {}, root index : {})".format( proc_tasks, dict(zip(all_tasks, all_tasks_roots)) ) assert all(next(iter(all_tasks_roots)) == _ for _ in all_tasks_roots), msg def is_task_matters(t, pt): return t in pt except TypeError: # proc tasks must contains integers as taskid all_tasks = {t for t in proc_tasks} def is_task_matters(t, pt): return t == pt assert ( len(all_tasks) <= 2 ), "Tasks intercommunicator has not been tested with 3 tasks or nore" if len(all_tasks) == 1: task_comm = {next(iter(all_tasks)): parent_comm.Dup()} else: assert len(proc_tasks) == parent_size if all([isinstance(_, HYSOP_INTEGER) for _ in proc_tasks]): # Single task per proc : need comm split task_comm = { proc_tasks[parent_rank]: parent_comm.Split( color=proc_tasks[parent_rank], key=parent_rank ) } else: # Multiple tasks per proc : need MPI groups parent_group = parent_comm.Get_group() ranks_tasks = { t: [i for i, _ in enumerate(proc_tasks) if t in _] for t in all_tasks } task_comm = { _: parent_comm.Create_group(parent_group.Incl(ranks_tasks[_])) for _ in all_tasks } # Remove null communicators task_comm = {t: c for t, c in task_comm.items() if c != MPI.COMM_NULL} # local ranks in tasks task_rank = {t: c.Get_rank() for t, c in task_comm.items()} # Build the root rank of each tasks on all process task_root_in_parent = {} all_task_ranks = parent_comm.allgather(task_rank) for t in all_tasks: for i, r in enumerate(all_task_ranks): # if current rank is involved in task t # and task rank is 0 then i is root if is_task_matters(t, proc_tasks[i]) and 0 in r.values(): task_root_in_parent[t] = i # Create intercommunicators from current task to others task_intercomm = {} # task overlapping map : gives the largest task of two overlapping tasks overlapping_map = { _: {__: None for __ in all_tasks if _ != __} for _ in all_tasks } # For all tasks the current rank is involved in my_tasks = tuple( _ for _ in all_tasks if is_task_matters(_, proc_tasks[parent_rank]) ) for tsource in my_tasks: for tdest in (_ for _ in all_tasks if _ != tsource): remote_leader = MPI.PROC_NULL if is_task_matters(tdest, proc_tasks[parent_rank]): remote_leader = task_root_in_parent[tsource] if is_task_matters(tsource, proc_tasks[parent_rank]): remote_leader = task_root_in_parent[tdest] intercomm = None if remote_leader != task_root_in_parent[tsource]: # Disjoint tasks intercomm = task_comm[tsource].Create_intercomm( 0, parent_comm, remote_leader ) else: if any([all([t in _ for _ in proc_tasks]) for t in all_tasks]): # TODO: review if nested tasks with differents ranks # for the moment : ensure ranks are identical throw all local tasks # assert all([all([t.values()[0] == _ for _ in t.values()]) for t in all_task_ranks]), all_task_ranks # If nested tasks, we use the largest task communicator largest_task = [ t for t in all_tasks if all([t in _ for _ in proc_tasks]) ][0] intercomm = task_comm[largest_task] overlapping_map[tsource][tdest] = largest_task overlapping_map[tdest][tsource] = largest_task else: raise NotImplementedError() task_intercomm[tdest] = intercomm # Build a per-machine communicator in order to get a rank on local machines # Split accoring to machine name hashed and converted to integer (strings generally differs only from a single character) machine_comm = parent_comm.Split( color=np.int32( int( hashlib.md5(MPI.Get_processor_name().encode("utf-8")).hexdigest(), 16, ) % np.iinfo(np.int32).max ), key=parent_rank, ) machine_rank = machine_comm.Get_rank() self._dim = dim self._parent_comm = parent_comm self._parent_rank = parent_rank self._is_task_matters = is_task_matters self._task_rank = task_rank self._task_comm = task_comm self._all_tasks = all_tasks self._task_root_in_parent = task_root_in_parent self._task_intercomm = task_intercomm self._has_tasks = len(all_tasks) > 1 self._machine_comm = machine_comm self._machine_rank = machine_rank self._proc_tasks = proc_tasks self._registered_topologies = {} self._frame = SymbolicFrame(dim=dim) self._overlapping_map = overlapping_map
[docs] def register_topology(self, topo): """Register a new topology on this domain. Do nothing if an equivalent topology is already in the list. """ from hysop.topology.topology import Topology check_instance(topo, Topology) topo_id = topo.id if topo_id in self._registered_topologies: assert topo is self._registered_topologies[topo_id] else: self._registered_topologies[topo_id] = topo
[docs] def remove_topology(self, topo): """ Remove a topology from the list of this domain. Do nothing if the topology does not exist in the list. """ from hysop.topology.topology import Topology check_instance(topo, Topology) topo_id = topo.id if topo_id in self._registered_topologies: self._registered_topologies.pop(topo_id) else: topo_id = -1 return topo_id
[docs] @abstractmethod def view(self, topology_state): """Return a view of this domain altered by some topology_state.""" pass